home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Chip 2000 October
/
CHIP Turkiye Ekim 2000.iso
/
prog
/
naps
/
04
/
setup.exe
/
Gnucleus
/
GnuSock.cpp
< prev
next >
Wrap
C/C++ Source or Header
|
2000-07-15
|
28KB
|
1,277 lines
/********************************************************************************
Gnucleus - A node application for the Gnutella network
Copyright (C) 2000 John Marshall
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
For support, questions, comments, etc...
E-Mail:
swabby@c0re.net
Address:
21 Cadogan Way
Nashua, NH, USA 03062
********************************************************************************/
// GnuSock.cpp : implementation file
//
#include "stdafx.h"
#include "Packet.h"
#include "Gnucleus.h"
#include "GnucleusDoc.h"
#include "ViewSearch.h"
#include "ViewSearchSpy.h"
#include "ViewTransfer.h"
#include "GnuTransfer.h"
#include "GnuHash.h"
#include "GnuControl.h"
#include "GnuSock.h"
#include "IPFilter.h"
#include <algorithm>
#include <queue>
#include <process.h>
#ifdef _DEBUG
#define new DEBUG_NEW
#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#endif
/////////////////////////////////////////////////////////////////////////////
// CGnuSock
CGnuSock::CGnuSock()
{
Initialize ();
}
CGnuSock::CGnuSock(CGnuControl *pGnuComm)
{
Initialize ();
GnuComm = pGnuComm;
//LogFile = new CFile("C:\\Packet DEBUG.txt", CFile::modeCreate | CFile::modeWrite);
}
void CGnuSock::Initialize ()
{
GnuComm = NULL;
next = NULL;
Host = "";
Port = "";
logPings = logPongs = logQueries =
logPushes = logQueryReplies = logUnknowns =
logGUID = logHops = logTTL = logPayload = logDropped = 0;
Connected = 0;
mExtraBytes = 0;
mExtraBytesStored = NULL;
m_dwNumOfBadPackets = 0;
m_dwNumOfRecivedPackets = 0;
m_dwNumOfSentPackets = 0;
m_dwBytesIn = m_dwBytesOut = m_dwTotalBytesIn = m_dwTotalBytesOut
= m_dwByteAllottmentOut = m_dwByteAllottmentIn = m_iSizeOfPacketQueue = 0;
m_timeLastHeardAT = CTime::GetCurrentTime();
m_CanRelease = false;
for(int i = 0; i < 8; i++)
RemoteHosts[i] = 0;
}
CGnuSock::~CGnuSock()
{
delete [] mExtraBytesStored;
//delete GnuComm;
//delete next;
// Clean out fifo queue
while(!m_PacketsOutQueue.empty())
{
delete m_PacketsOutQueue.front();
m_PacketsOutQueue.pop();
}
}
// Do not edit the following lines, which are needed by ClassWizard.
#if 0
BEGIN_MESSAGE_MAP(CGnuSock, CAsyncSocket)
//{{AFX_MSG_MAP(CGnuSock)
//}}AFX_MSG_MAP
END_MESSAGE_MAP()
#endif // 0
/////////////////////////////////////////////////////////////////////////////
// CGnuSock member functions
// CAnsycSocket Overrides
void CGnuSock::OnConnect(int nErrorCode)
{
if (nErrorCode == 0)
{
Send("GNUTELLA CONNECT/0.4\n\n\0", 22);
GnuComm->BytesOut += 22;
m_dwNumOfSentPackets++;
Log += "*** Connected to node, waiting for reply.\r\r\n";
// Update the local host
UINT Trash;
GetSockName(GnuComm->localHost, Trash);
}
CAsyncSocket::OnConnect(nErrorCode);
}
void CGnuSock::OnReceive(int nErrorCode)
{
byte *pBuff = new byte[132000];
DWORD length = 0;
int iReceiveLength = 0, iErrorCode = 0;
if (mExtraBytes)
{
ASSERT(mExtraBytesStored);
memcpy(pBuff, mExtraBytesStored, mExtraBytes);
length = mExtraBytes;
}
iReceiveLength = Receive(&pBuff[length], 132000 - length);
switch (iReceiveLength)
{
case 0:
// Connection has been closed, shutdown
WantToDisconnect();
m_CanRelease = true;
CleanUp();
break;
case SOCKET_ERROR:
OutputDebugString("CGnuSock::OnReceive - SOCKET_ERROR\n");
// received an error, see what it is and handle.
switch(GetLastError())
{
// fatal errors, need to disconnect
default:
case WSAECONNRESET: // The virtual circuit was reset by the remote side.
case WSAENOTCONN: // The socket is not connected.
case WSAENOTSOCK: // The descriptor is not a socket.
case WSAESHUTDOWN: // The socket has been shut down
case WSAECONNABORTED: // The virtual circuit was aborted due to timeout or other failure.
case WSAEWOULDBLOCK: //The socket is marked as nonblocking and the Receive operation would block. Shouldn't happen.
case WSAENETDOWN: // detected that the network subsystem failed
WantToDisconnect();
m_CanRelease = true;
CleanUp();
break;
// non fatal errors, just handle individualy
case WSAEMSGSIZE: // The datagram was too large to fit into the specified buffer and was truncated.
// Someone is probably sending us invalid packets, dump them.
WantToDisconnect();
m_CanRelease = true;
CleanUp();
break;
}
return;
break;
default: // just normal data
m_timeLastHeardAT = CTime::GetCurrentTime();
length += iReceiveLength;
}
m_dwBytesIn += length - mExtraBytes;
GnuComm->BytesIn += length - mExtraBytes;
// We moved the previous extra data to the beginning
// of the current data burst, so release the temp storage.
mExtraBytes = 0;
delete [] mExtraBytesStored;
mExtraBytesStored = NULL;
char *first = (char*) pBuff, *last = (char*) pBuff + length;
char *firstLineFeed = std::find (first, last, '\n');
std::string firstLine;
if (firstLineFeed != last)
{
firstLine.assign (first, firstLineFeed);
}
// Check for a file request
if(!Connected)
{
if(firstLine.find( "GET /get/") != -1)
{
CString header(pBuff);
SOCKET sock = Detach ();
WantToDisconnect();
m_CanRelease = true;
CleanUp();
delete [] pBuff;
((CViewTransfer *) ((CGnucleusApp *) AfxGetApp())->TransferFrame->GetActiveView())->NewUpload(header, sock);
return;
};
}
// Check for PUSHes
if(!Connected)
{
std::string pushString ("GIV ");
if (!firstLine.compare (0, pushString.size(), pushString))
{
// This is a PUSH response, not a normal client connection. Give this to the control object
// and let it set up a transfer object to deal with it.
SOCKET sock = Detach ();
WantToDisconnect();
m_CanRelease = true;
CleanUp();
delete [] pBuff;
((CViewTransfer *) ((CGnucleusApp *) AfxGetApp())->TransferFrame->GetActiveView())->NewPushDownload(firstLine, sock);
return;
}
}
// Check for connection reply
if(!Connected)
{
if (!firstLine.compare ("GNUTELLA CONNECT/0.4"))
Send_ConnectOK();
else if (!firstLine.compare ("GNUTELLA OK"))
Recieve_ConnectOK();
// if not connected now, then disconnect
if(!Connected)
{
WantToDisconnect();
CleanUp();
}
}
if(Connected)
{
mExtraBytes = SplitBundle(pBuff, length);
if (0 != mExtraBytes)
{
// We shouldn't have almost a full 132000 byte
// buffer unprocessed!
ASSERT(130000 > mExtraBytes);
mExtraBytesStored = new byte[mExtraBytes];
memcpy(mExtraBytesStored,
&pBuff[length - mExtraBytes], mExtraBytes);
}
}
delete [] pBuff;
CAsyncSocket::OnReceive(nErrorCode);
}
void CGnuSock::OnClose(int nErrorCode)
{
m_CanRelease = true; // Socket has closed, definitely safe to release now
// clean out the buffer
char * pBuf = new char[101];
int iRet = 0;
while( (iRet = Receive(pBuf, 100)) != 0 && iRet != SOCKET_ERROR );
delete [] pBuf;
CleanUp();
GnuComm->RemoveNode(Host, Port);
CAsyncSocket::OnClose(nErrorCode);
}
// Member Functions
void CGnuSock::Recieve_ConnectOK()
{
// Add Node to ListBox
if( SpaceAvailable() )
{
CString HostPort = Host;
HostPort += ":";
HostPort += Port;
GnuComm->Doc->QueueConnect.AddTail(HostPort);
GnuComm->Doc->Connections.AddTail(HostPort);
Connected = 1;
Log += "*** Reply from node recieved, connection established.\r\r\n";
}
}
void CGnuSock::Send_ConnectOK()
{
if( SpaceAvailable() )
{
Connected = 1;
Send("GNUTELLA OK\n\n\0", 13);
GnuComm->BytesOut += 13;
// Add Node to ListBox
CString HostPort = Host;
HostPort += ":Inbound";
GnuComm->Doc->QueueConnect.AddTail(HostPort);
GnuComm->Doc->Connections.AddTail(HostPort);
Log += "*** Reply from node recieved, connection established.\r\r\n";
m_dwNumOfSentPackets++;
}
}
bool CGnuSock::SpaceAvailable()
{
if(GnuComm->Doc->m_DropForIncoming || GnuComm->Doc->m_MonitorType == 0)
{
return 1;
}
else if(GnuComm->Doc->m_MonitorType == 3 ||
(GnuComm->Doc->m_MonitorType == 2 && GnuComm->Doc->Connections.GetCount() < GnuComm->Doc->m_ConnectNum) ||
(GnuComm->Doc->m_MonitorType == 1 && GnuComm->Doc->Connections.GetCount() < GnuComm->Doc->m_ConnectNum - 1))
{
return 1;
}
else
{
WantToDisconnect();
m_CanRelease = true;
CleanUp();
}
return 0;
}
//
// MWD 05-12-2000 START
// Here's the latest SplitBundle. It checks whether there's enough
// data in the buffer at each decoding point to see whether it should
// continue or process an error. If it detects an incomplete packet,
// it returns to the caller the number of extra bytes so that it can
// store them off for the next bundle.
int CGnuSock::SplitBundle(byte *bundle, DWORD length)
{
// Causing problems
//ASSERT(sizeof(packet_Header) <= length);
DWORD Payload;
DWORD nextPos = 0;
packet_Header *packet;
enum status
{
status_DONE,
status_CONTINUE,
status_BAD_PACKET,
status_INCOMPLETE_PACKET
};
status theStatus = status_CONTINUE;
do
{
if ((nextPos + sizeof(packet_Header)) > length)
theStatus = status_INCOMPLETE_PACKET;
else
{
packet = (packet_Header *) (bundle + nextPos);
Payload = makeD( flipX( packet->Payload ) );
if((packet->Function == 0x00 && Payload == 0) ||
(packet->Function == 0x01 && Payload == 14) ||
(packet->Function == 0x40 && Payload == 26) ||
(packet->Function == 0x80 && Payload >= 2 && Payload <= 257) ||
(packet->Function == 0x81 && Payload >= 27 && Payload <= 67075))
{
if ((nextPos + sizeof(packet_Header) + Payload) <= length)
{
HandlePacket(packet, 23 + Payload);
nextPos += 23 + Payload;
if (nextPos == length)
theStatus = status_DONE;
}
else
theStatus = status_INCOMPLETE_PACKET;
}
else
{
/* DW
theStatus = status_BAD_PACKET;
// If we had a bad packet, just throw it all out.
nextPos = length;
m_dwNumOfBadPackets++;
*/
// !!** DW - Dean Wyant 6/19/00
// The above losses all pending messages in the buffer
// Let's just find the next possible header
theStatus = status_CONTINUE;
if (nextPos < (length - sizeof(packet_Header)))
{
nextPos++;
length--;
}
else
{ // No good header found - but leave the , header bytes data for next time
theStatus = status_BAD_PACKET;
m_dwNumOfBadPackets++;
}
}
}
} while (status_CONTINUE == theStatus);
if (logUnknowns)
{
switch (theStatus)
{
case status_DONE:
break;
case status_BAD_PACKET:
Log += "Bad packet\r\r\n";
break;
case status_INCOMPLETE_PACKET:
{
//Log += "Extra ";
//char buffer[20];
//itoa(length - nextPos, buffer, 10);
//Log += buffer;
//Log += " Remaining\r\r\n";
}
break;
case status_CONTINUE:
// This should never happen.
ASSERT(FALSE);
break;
default:
break;
}
}
return length - nextPos;
}
// MWD 05-12-2000 END
//
void CGnuSock::HandlePacket(packet_Header *packet, int length)
{
switch(packet->Function)
{
case 0x00:
Recieve_Ping((packet_Ping *) packet, length);
m_dwNumOfRecivedPackets++;
GnuComm->totalPings++;
break;
case 0x01:
Recieve_Pong((packet_Pong *) packet, length);
m_dwNumOfRecivedPackets++;
GnuComm->totalPongs++;
break;
case 0x40:
Recieve_Push((packet_Push *) packet, length);
m_dwNumOfRecivedPackets++;
GnuComm->totalPushes++;
break;
case 0x80:
Recieve_Query((packet_Query *) packet, length);
m_dwNumOfRecivedPackets++;
GnuComm->totalQueries++;
break;
case 0x81:
Recieve_QueryReply((packet_QueryReply *) packet, length);
m_dwNumOfRecivedPackets++;
GnuComm->totalQueryReplies++;
break;
default:
// Disable unknowns
Recieve_Unknown((byte *) packet, length);
m_dwNumOfBadPackets++;
GnuComm->totalUnknowns++;
break;
}
}
bool CGnuSock::InspectPacket(packet_Header *packet, int length)
{
// Increment hops of packet first thing
packet->Hops++;
// Add data to the statistic values
if(packet->Hops > 7)
GnuComm->totalHopsBig++;
else
GnuComm->totalHops++;
if(packet->TTL > 7)
GnuComm->totalTTLsBig++;
else
GnuComm->totalTTLs++;
// If packet has hopped more than 7 times kill it
if(packet->Hops > 7)
return 0;
// Reset TTL of packet if it needs to be
if(packet->TTL > 7)
packet->TTL = 7 - packet->Hops;
return 1;
}
void CGnuSock::Recieve_Ping(packet_Ping *Ping, int length)
{
if(logPings)
Log += "Ping";
if(!InspectPacket((packet_Header *) Ping, length))
{
if(logPings)
Log += ", dropped (hops expired)\r\r\n";
return;
}
if(Ping->Header.TTL == 0) // Check its TTL and pass it along
{
if(logPings)
Log += ", dropped (TTL expired)\r\r\n";
return;
}
else
Ping->Header.TTL--;
// Check the GUID of the packet to make sure no duplicates
CGnuControl::GuidHash::iterator it = GnuComm->m_cache.find (Ping->Header.Guid);
// key_data *key = GnuComm->HashTable.FindValue(&Ping->Header.Guid);
// if(key == NULL)
if (it == GnuComm->m_cache.end ())
{
// GnuComm->HashTable.Insert(&Ping->Header.Guid, this);
GnuComm->m_cache[Ping->Header.Guid] = this;
GnuComm->Broadcast_Ping(Ping, length, this);
Send_Pong(Ping->Header.Guid, Ping->Header.Hops);
}
else
{
if((*it).second == this)
{
if(logPings)
Log += ", dropped (duplicate)\r\r\n";
return;
}
else
{
if(logPings)
Log += ", dropped (routing)\r\r\n";
return;
}
}
if(logPings)
Log += "\r\r\n";
}
void CGnuSock::Recieve_Pong(packet_Pong *Pong, int length)
{
if(logPongs)
Log += "Pong";
if(!InspectPacket((packet_Header *) Pong, length))
{
if(logPongs)
Log += ", dropped (hops expired)\r\r\n";
return;
}
if(Pong->Host.a == 0 && Pong->Host.b == 0 && Pong->Host.c == 0 && Pong->Host.d == 0)
{
if(logPongs)
Log += ", dropped (TTL expired)\r\r\n";
return;
}
// Check its TTL and pass it along
if(Pong->Header.TTL == 0)
{
if(logPongs)
Log += ", dropped (TTL expired)\r\r\n";
return;
}
else
Pong->Header.TTL--;
// Add pong to host cache
CString HostPort = IPtoStr(Pong->Host);
HostPort += ":";
HostPort += WrdtoStr(Pong->Port);
if ((0 != Pong->Port) &&
CIPFilter::AllowIP(Pong->Host) &&
CIPFilter::IsPrivateIP(Pong->Host, Pong->Host) == 0)
{
GnuComm->Doc->AddToQueueCache(HostPort);
}
/*
while(GnuComm->Doc->QueueCache.GetCount() > 500)
GnuComm->Doc->QueueCache.RemoveTail();
if(Pong->Port != 0)
{
// Don't add duplicate hosts
if (!GnuComm->Doc->QueueCache.Find (HostPort))
{
if (CIPFilter::AllowIP (Pong->Host.a, Pong->Host.b, Pong->Host.c, Pong->Host.d))
{
GnuComm->Doc->QueueCache.AddTail( HostPort);
}
}
}
*/
// Check the GUID of the packet to make sure no duplicates
CGnuControl::GuidHash::iterator it = GnuComm->m_cache.find (Pong->Header.Guid);
key_data *sendKey = GnuComm->SendTable.FindValue(&Pong->Header.Guid);
if(sendKey != NULL)
{
if(logPongs)
Log += " recieved\r\r\n";
RemoteHosts[Pong->Header.Hops]++;
return;
}
if(it != GnuComm->m_cache.end ())
{
key_data key;
key.Guid = Pong->Header.Guid;
key.Origin = (*it).second;
GnuComm->Route_Pong(Pong, length, &key);
}
else
{
if(logPongs)
Log += ", dropped (routing)\r\r\n";
return;
}
if(logPongs)
Log += "\r\r\n";
}
void CGnuSock::Recieve_Push(packet_Push *Push, int length)
{
if(logPushes)
Log += "Push";
if(!InspectPacket((packet_Header *) Push, length))
{
if(logPushes)
Log += ", dropped (hops expired)\r\r\n";
return;
}
// After thinking about this its probably not the best to censor other people's push requests
/*if (!CIPFilter::AllowIP (Push->Host.a, Push->Host.b, Push->Host.c, Push->Host.d))
{
if(logPushes)
Log += ", dropped (IP filter)\r\r\n";
return;
}*/
if(Push->Host.a == 0 && Push->Host.b == 0 && Push->Host.c == 0 && Push->Host.d == 0)
{
if(logPushes)
Log += ", dropped (null host)\r\r\n";
return;
}
// Check its TTL and pass it along
if(Push->Header.TTL == 0)
{
if(logPushes)
Log += ", dropped (TTL expired)\r\r\n";
return;
}
else
Push->Header.TTL--;
// Check to see if the push is for us
if(GnuComm->ClientGUID == Push->ClientID)
{
if(logPushes)
Log += " received\r\r\n";
QueryItem File;
File.Handle = 16000;
File.Index = makeD( flipX(Push->Index) );
File.Guid = Push->ClientID;
File.Host = Push->Host;
File.Port = Push->Port;
File.Status = "Push";
File.TransferType = 'U';
GnuComm->AddTransfer(File);
GnuComm->GetTransfer(16000, 'U')->Connect( IPtoStr(File.Host), File.Port);
return;
}
// Check the GUID of the packet to make sure no duplicates
key_data *key = GnuComm->HashClientTable.FindValue(&Push->ClientID);
if(key != NULL)
GnuComm->Route_Push(Push, length, key);
else
{
if(logPushes)
Log += ", dropped (routing)\r\r\n";
return;
}
if(logPushes)
Log += "\r\r\n";
}
void CGnuSock::Recieve_Query(packet_Query *Query, int length)
{
if(logQueries)
Log += "Query";
if(!InspectPacket((packet_Header *) Query, length))
{
if(logQueries)
Log += ", dropped (hops expired)\r\r\n";
return;
}
// Check its TTL and pass it along
if(Query->Header.TTL == 0)
{
if(logQueries)
Log += ", dropped (TTL expired)\r\r\n";
return;
}
else
Query->Header.TTL--;
// Check the GUID of the packet to make sure no duplicates
CGnuControl::GuidHash::iterator it = GnuComm->m_cache.find (Query->Header.Guid);
if(it == GnuComm->m_cache.end ())
{
GnuComm->m_cache[Query->Header.Guid] = this;
GnuComm->Broadcast_Query(Query, length, this);
}
else
if((*it).second == this)
{
if(logQueries)
Log += ", dropped (duplicate)\r\r\n";
return;
}
else
{
if(logQueries)
Log += ", dropped (routing)\r\r\n";
return;
}
// Extract the query
BYTE *RawQuery = (BYTE *) Query;
CString SearchString;
int i = 25;
while(i < 23 + makeD(flipX(Query->Header.Payload)) && RawQuery[i] != '\0')
SearchString += RawQuery[i++];
if(logQueries)
Log += "\r\r\n";
//
// Log search request to spy window.
//
GnuComm->Doc->UpdateSpy( SearchString );
CurrentOrigin = Query->Header.Guid;
CurrentSearch = SearchString;
GnuComm->PushQueryToQueue(this);
//Log += "\r\r\n";
}
void CGnuSock::Recieve_QueryReply(packet_QueryReply *QueryReply, DWORD length)
{
if(logQueryReplies)
Log += "Query Hit";
// Out of order, but this bandaid gets search results only for us
// If the query result is coming from more than 7 hops away
key_data *sendKey = GnuComm->SendTable.FindValue(&QueryReply->Header.Guid);
byte *pReply = (byte *) QueryReply;
GUID *pClientID = (GUID *) &pReply[length - 16];
if(sendKey != NULL)
{
if ( CIPFilter::AllowIP( QueryReply->Host ) ) // test against search filter
{
if(logQueryReplies)
Log += " recieved\r\r\n";
GnuComm->Doc->UpdateSearchViews(QueryReply);
GnuComm->HashClientTable.Insert(pClientID, this);
return;
}
else
{
if(logQueryReplies)
Log += " dropped (filtered)\r\r\n";
return;
}
}
// Now go along and inspect it
if(!InspectPacket((packet_Header *) QueryReply, length))
{
if(logQueryReplies)
Log += ", dropped (hops expired)\r\r\n";
return;
}
if(QueryReply->Host.a == 0 && QueryReply->Host.b == 0 && QueryReply->Host.c == 0 && QueryReply->Host.d == 0)
{
if(logQueryReplies)
Log += ", dropped (null host)\r\r\n";
return;
}
if(QueryReply->TotalHits == 0)
{
if(logQueryReplies)
Log += ", dropped (nothing found)\r\r\n";
return;
}
// Check its TTL and pass it along
if(QueryReply->Header.TTL == 0)
{
if(logQueryReplies)
Log += ", dropped (TTL expired)\r\r\n";
return;
}
else
QueryReply->Header.TTL--;
// Check the GUID of the packet to make sure no duplicates
CGnuControl::GuidHash::iterator it2 = GnuComm->m_cache.find (QueryReply->Header.Guid);
// key_data* key = GnuComm->HashTable.FindValue(&QueryReply->Header.Guid);
if(it2 != GnuComm->m_cache.end ())
{
key_data key;
key.Guid = (*it2).first;
key.Origin = (*it2).second;
GnuComm->Route_QueryReply(QueryReply, length, &key);
}
else
{
if(logQueryReplies)
Log += ", dropped (routing)\r\r\n";
return;
}
if(logQueryReplies)
Log += "\r\r\n";
}
void CGnuSock::Recieve_Unknown(byte *pBuff, int length)
{
if(logUnknowns)
Log += "Unknown, dropped";
if(!InspectPacket((packet_Header *) pBuff, length))
return;
if(logUnknowns)
Log += ",\r\r\n";
}
void CGnuSock::Send_Ping()
{
GUID Guid = GUID_NULL;
::CoCreateGuid(&Guid);
if (Guid == GUID_NULL)
{
AfxMessageBox("Failed to create a GUID to send.");
return;
}
packet_Ping Ping;
Ping.Header.Guid = Guid;
Ping.Header.Function = 0;
Ping.Header.Hops = 0;
Ping.Header.TTL = 7;
Ping.Header.Payload = flipX( makeX(0) );
GnuComm->m_cache[Guid] = NULL;
GnuComm->SendTable.Insert(&Guid, NULL);
Send(&Ping, 23);
GnuComm->BytesOut += 23;
m_dwNumOfSentPackets++;
}
void CGnuSock::Send_Pong(GUID Guid, int Hops)
// Send init responses out the same socket from which they came
{
// Get the local address of this machine
DWORD localPort = GnuComm->localPort;
IP Host;
if( IPtoStr(GnuComm->Doc->m_ForceIP) != "0.0.0.0")
Host = GnuComm->Doc->m_ForceIP;
else
Host = StrtoIP(GnuComm->localHost);
// Get the current file count and total size
DWORD FileCount = 0;
DWORD FileSize = 0;
std::vector<DWORD>::iterator itCount;
std::vector<DWORD>::iterator itSize;
for(itCount = GnuComm->Doc->SharedDirCount.begin(), itSize = GnuComm->Doc->SharedDirSize.begin();
itCount != GnuComm->Doc->SharedDirCount.end(), itSize != GnuComm->Doc->SharedDirSize.end();
itCount++, itSize++)
{
FileCount += *itCount;
FileSize += *itSize;
}
// Build the packet
packet_Pong *Pong = new packet_Pong;
Pong->Header.Guid = Guid;
Pong->Header.Function = 0x01;
Pong->Header.TTL = Hops;
Pong->Header.Hops = 0;
Pong->Header.Payload = flipX( makeX(14) );
Pong->Port = (WORD) localPort;
Pong->Host = Host;
Pong->FileCount = flipX( makeX(FileCount) );
Pong->FileSize = flipX( makeX(FileSize) );
Send(Pong, 37);
GnuComm->BytesOut += 37;
m_dwNumOfSentPackets++;
delete Pong;
}
void CGnuSock::Send_Query(CViewSearch *pSearch)
{
BYTE QueryByte[23 + 255];
packet_Query Query;
byte *pQuery = (byte *) &Query;
Query.Header.Guid = pSearch->myGuid;
Query.Header.Function = 0x80;
Query.Header.Hops = 0;
Query.Header.TTL = 7;
Query.Header.Payload = flipX( makeX(pSearch->length + 3) );
Query.Speed = pSearch->BytesPerSec;
for(int i = 0; i < 25; i++)
QueryByte[i] = pQuery[i];
// Add Search
for(i = 0; i < pSearch->keyword.GetLength(); i++)
QueryByte[25 + i] = pSearch->keyword.GetAt(i);
QueryByte[25 + pSearch->length] = NULL;
// Send the query
Send(QueryByte, 26 + pSearch->length);
GnuComm->BytesOut += 26 + pSearch->length;
m_dwNumOfSentPackets++;
}
void CGnuSock::CleanUp()
{
POSITION pos;
CString HostPort = Host;
Log += "*** Connection with node closed.\r\r\n";
HostPort += ":";
HostPort += Port;
pos = GnuComm->Doc->Connections.Find(HostPort);
if(pos != NULL)
{
GnuComm->Doc->Connections.RemoveAt(pos);
GnuComm->Doc->QueueDisconnect.AddTail(HostPort);
}
Connected = 0;
}
void CGnuSock::RefreshHostCount()
{
for(int i = 0; i < 8; i++)
RemoteHosts[i] = 0;
Send_Ping();
}
DWORD CGnuSock::TotalRemoteHosts()
{
DWORD Total = 0;
for(int i = 0; i < 8; i++)
Total += RemoteHosts[i];
return Total;
}
void CGnuSock::WantToDisconnect()
{
// Let spock know that he is going to die, so it might as well
// speed up the process.
if(m_hSocket != INVALID_SOCKET)
{
AsyncSelect(FD_CLOSE);
ShutDown(sends);
}
Connected = false;
}
////////////////////////////////////////////////////////////
// author: Nathan Brown
//
// Called from CGnuControl::QueriesSearchThread to send results
void CGnuSock::SendFileResults(std::queue <QueryResult> & resultQueue, GUID & Origin)
{
try // socket may have died
{
CGnucleusDoc *Doc = GnuComm->Doc;
int hits = resultQueue.size();
WORD localPort = GnuComm->localPort;
IP Host;
if( IPtoStr(Doc->m_ForceIP) != "0.0.0.0")
Host = Doc->m_ForceIP;
else
Host = StrtoIP(GnuComm->localHost);
DWORD Speed;
if(Doc->m_ConnectSpeed)
Speed = Doc->m_ConnectSpeed;
else
Speed = Doc->m_EstSpeed;
BYTE *packet = new BYTE[67075];
packet_QueryReply *Reply = (packet_QueryReply *) packet;
Reply->Header.Guid = Origin;
Reply->Header.Function = 0x81;
Reply->Header.TTL = 7;
Reply->Header.Hops = 0;
//Reply->Header.Payload = flipX( makeX(length - 23) );
Reply->TotalHits = hits;
Reply->Port = localPort;
Reply->Host = Host;
Reply->Speed = flipX( makeX(Speed));
DWORD pos = 34;
while (!resultQueue.empty ())
{
const QueryResult& tmpReply = resultQueue.front (); // Gives us a reference to the top of the queue
DWORD dwSize = tmpReply.GetSize ();
tmpReply.CopyToBuffer (packet + pos, dwSize);
pos += dwSize;
resultQueue.pop ();
}
GUID Guid = GnuComm->ClientGUID;
::memcpy (packet + pos, &Guid, sizeof (Guid));
// Set the payload
Reply->Header.Payload = flipX( makeX(pos - 7));
Send(packet, pos + 16);
GnuComm->BytesOut += (pos + 16);
m_dwNumOfSentPackets++;
delete [] packet;
}
catch( ... )
{
return;
}
}
//: Override to Limit outgoing packets
int CGnuSock::Send( const void* lpBuf, int nBufLen, int nFlags )
{
int i_return;
if(m_dwByteAllottmentOut)
{
// save the buffer
void * lp_save_buff = new BYTE[nBufLen];
memcpy(lp_save_buff, lpBuf, nBufLen);
m_CritSecPacketQueue.Lock(); // only one thread can access this queue at once.
m_PacketsOutQueue.push(lp_save_buff);
m_PacketsOutQueueSize.push(nBufLen);
m_CritSecPacketQueue.Unlock();
}
else // just send like normal
{
CAsyncSocket::Send(lpBuf, nBufLen, nFlags);
m_dwBytesOut += nBufLen;
}
ProccessPacketQueue();
i_return = m_dwByteAllottmentOut - m_dwBytesOut;
i_return = i_return > 0 ? i_return : 1; // should be at least 1 if no problems
return i_return;
}
int CGnuSock::ProccessPacketQueue()
{
int n_buf_len = 0;
void * lp_buff = NULL;
m_CritSecPacketQueue.Lock();
while( (m_dwBytesOut < m_dwByteAllottmentOut || !m_dwByteAllottmentOut)
&& !m_PacketsOutQueue.empty())
{
lp_buff = m_PacketsOutQueue.front();
m_PacketsOutQueue.pop();
n_buf_len = m_PacketsOutQueueSize.front();
m_PacketsOutQueueSize.pop();
CAsyncSocket::Send(lp_buff, n_buf_len);
m_dwBytesOut += n_buf_len;
delete lp_buff;
}
m_CritSecPacketQueue.Unlock();
m_iSizeOfPacketQueue = m_PacketsOutQueue.size();
// return weather empty or not.
return m_iSizeOfPacketQueue;
}